/*
 *
 *  * Unidata Platform
 *  * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *  *
 *  * Commercial License
 *  * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *  *
 *  * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 *  * For clarification or additional options, please contact: info@unidata-platform.com
 *  * -------
 *  * Disclaimer:
 *  * -------
 *  * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 *  * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 *  * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 *  * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 *  * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 *
 */

package org.unidata.mdm.data.service.segments.relations.draft;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.context.ValidityRangeContext;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.calculables.CalculableHolder;
import org.unidata.mdm.core.type.formless.BundlesArray;
import org.unidata.mdm.core.type.formless.DataBundle;
import org.unidata.mdm.core.type.timeline.MutableTimeInterval;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.GetRelationTimelineRequestContext;
import org.unidata.mdm.data.context.ReadWriteTimelineContext;
import org.unidata.mdm.data.context.RelationIdentityContext;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.impl.CommonRelationsComponent;
import org.unidata.mdm.data.service.segments.RelationDraftTimelineSupport;
import org.unidata.mdm.data.service.segments.ValidityRangeCheckSupport;
import org.unidata.mdm.data.type.data.OriginRelation;
import org.unidata.mdm.data.type.data.OriginRelationInfoSection;
import org.unidata.mdm.data.type.data.RelationType;
import org.unidata.mdm.data.type.draft.DataDraftConstants;
import org.unidata.mdm.data.type.draft.DataDraftOperation;
import org.unidata.mdm.data.type.keys.RelationKeys;
import org.unidata.mdm.draft.context.DraftUpsertContext;
import org.unidata.mdm.draft.service.DraftService;
import org.unidata.mdm.draft.type.Draft;
import org.unidata.mdm.draft.type.Edition;
import org.unidata.mdm.system.type.pipeline.Point;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.support.IdentityHashSet;
import org.unidata.mdm.system.type.variables.Variables;

/**
 * @author Alexey Tsarapkin
 */
@Component(RelationDraftUpsertProcessExecutor.SEGMENT_ID)
public class RelationDraftUpsertProcessExecutor extends Point<DraftUpsertContext>
    implements ValidityRangeCheckSupport<UpsertRequestContext>, RelationDraftTimelineSupport {
    /**
     * This segment id.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RELATION_DRAFT_UPSERT_PROCESS]";
    /**
     * This segment description.
     */
    private static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".relation.draft.upsert.process.description";
    /**
     * The draft service.
     */
    @Autowired
    private DraftService draftService;
    /**
     * Common records component.
     */
    @Autowired
    private CommonRelationsComponent commonRelationsComponent;
    /**
     * The MMS.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * Constructor.
     */
    public RelationDraftUpsertProcessExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void point(DraftUpsertContext ctx) {

        // 1. The stuff runs only, if some data were supplied
        if (!ctx.hasPayload()) {
            return;
        }

        Draft draft = ctx.currentDraft();

        // Don't save state for containments.
        // This is handled by records entirely
        RelationIdentityContext ric = ctx.getPayload();
        if (ric.relationType() == RelationType.CONTAINS) {

            if (!draft.hasEditions()) {

                Edition next = new Edition();
                next.setContent(new BundlesArray());
                next.setCreateDate(new Date(System.currentTimeMillis()));
                next.setCreatedBy(SecurityUtils.getCurrentUserName());

                ctx.currentEdition(next);
            }

            return;
        }

        // 2. Load last edition
        Edition current = null;
        if (draft.isExisting()) {
            current = draftService.current(draft.getDraftId(), true);
        }

        // 3. Take data snapshot for new drafts, if no editions exist
        BundlesArray bundles = null;
        if (Objects.isNull(current)) {
            bundles = handleBase(draft, ctx);
        } else {
            bundles = current.getContent();
        }

        // 4. Allow state operations take a snapshot (to create initial edition) and suppress further processing
        DataDraftOperation operation = draft
                .getVariables()
                .valueGet(DataDraftConstants.OPERATION_CODE, DataDraftOperation.class);

        if (operation == DataDraftOperation.DELETE_RECORD
         || operation == DataDraftOperation.RESTORE_RECORD) {

            if (current == null) {

                Edition next = new Edition();
                next.setContent(bundles);
                next.setCreateDate(new Date(System.currentTimeMillis()));
                next.setCreatedBy(SecurityUtils.getCurrentUserName());

                ctx.currentEdition(next);
            }

            return;
        }

        // 5. Create bundles array, if needed (no editions and the record is NEW)
        if (Objects.isNull(bundles)) {
            bundles = new BundlesArray();
        }

        // 6. Process user input and add the payload to collection
        bundles.addAll(handleInput(ctx));

        Edition next = new Edition();
        next.setContent(bundles);
        next.setCreateDate(new Date(System.currentTimeMillis()));
        next.setCreatedBy(SecurityUtils.getCurrentUserName());

        // 7. Cleanse the bundle - remove invisible periods and save it to edition
        cleanseBundles(draft, next);

        ctx.currentEdition(next);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return DraftUpsertContext.class.isAssignableFrom(start.getInputTypeClass());
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CommonRelationsComponent getCommonRelationsComponent() {
        return commonRelationsComponent;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public MetaModelService getMetaModelService() {
        return metaModelService;
    }

    protected BundlesArray handleBase(Draft draft, DraftUpsertContext ctx) {

        boolean isNew = draft.getVariables().valueGet(DataDraftConstants.IS_NEW_RECORD);
        if (isNew) {
            return null;
        }

        RelationIdentityContext original = ctx.getPayload();

        // The TL is loaded from persistent storage for the first time snapshot,
        // since some operations (such as restore) reduce TL
        RelationKeys keys = original.relationKeys();
        Timeline<OriginRelation> timeline = commonRelationsComponent.loadTimeline(GetRelationTimelineRequestContext.builder()
                .relationEtalonKey(keys.getEtalonKey().getId())
                .relationLsn(keys.getEtalonKey().getLsn())
                .relationShard(keys.getShard())
                .build());

        if (timeline.isEmpty()) {
            return null;
        }

        Set<CalculableHolder<OriginRelation>> supply = timeline.stream()
                .map(TimeInterval::unlock)
                .map(MutableTimeInterval::toCalculables)
                .flatMap(Collection::stream)
                .collect(Collectors.toCollection(IdentityHashSet::new));

        BundlesArray result = new BundlesArray();
        for (CalculableHolder<OriginRelation> ch : supply) {

            OriginRelationInfoSection is = ch.getValue().getInfoSection();

            Instant from = ch.getValidFrom() == null ? null : ch.getValidFrom().toInstant();
            Instant to = ch.getValidTo() == null ? null : ch.getValidTo().toInstant();
            Instant ts = is.getUpdateDate().toInstant();

            result.add(new DataBundle()
                    .withRecord(ch.getValue())
                    .withVariables(new Variables()
                            .add(DataDraftConstants.BOX_KEY, ch.toBoxKey())
                            .add(DataDraftConstants.REVISION, ch.getRevision())
                            .add(DataDraftConstants.VALID_FROM, from)
                            .add(DataDraftConstants.VALID_TO, to)
                            .add(DataDraftConstants.IS_DRAFT_RECORD, Boolean.FALSE)
                            .add(DataDraftConstants.OPERATION_TYPE, is.getOperationType())
                            .add(DataDraftConstants.PERIOD_STATUS, is.getStatus())
                            .add(DataDraftConstants.UPDATE_TIMESTAMP, ts)
                            .add(DataDraftConstants.UPDATED_BY, is.getUpdatedBy())));

        }

        return result;
    }

    protected List<DataBundle> handleInput(DraftUpsertContext ctx) {

        List<DataBundle> bundles = new ArrayList<>();
        ReadWriteTimelineContext<OriginRelation> source = ctx.getPayload();
        ValidityRangeContext vrc = ctx.getPayload();
        Timeline<OriginRelation> next = source.nextTimeline()
                .reduceBy(vrc.getValidFrom(), vrc.getValidTo());

        Set<CalculableHolder<OriginRelation>> supply = next.stream()
                .map(TimeInterval::unlock)
                .map(MutableTimeInterval::toModifications)
                .map(Map::values)
                .flatMap(Collection::stream)
                .flatMap(Collection::stream)
                .collect(Collectors.toCollection(IdentityHashSet::new));

        for (CalculableHolder<OriginRelation> ch : supply) {

            OriginRelationInfoSection is = ch.getValue().getInfoSection();

            Instant from = ch.getValidFrom() == null ? null : ch.getValidFrom().toInstant();
            Instant to = ch.getValidTo() == null ? null : ch.getValidTo().toInstant();
            Instant ts = is.getUpdateDate().toInstant();

            bundles.add(new DataBundle()
                    .withRecord(ch.getValue())
                    .withVariables(new Variables()
                            .add(DataDraftConstants.BOX_KEY, ch.toBoxKey())
                            .add(DataDraftConstants.REVISION, ch.getRevision())
                            .add(DataDraftConstants.VALID_FROM, from)
                            .add(DataDraftConstants.VALID_TO, to)
                            .add(DataDraftConstants.IS_DRAFT_RECORD, Boolean.TRUE)
                            .add(DataDraftConstants.OPERATION_TYPE, is.getOperationType())
                            .add(DataDraftConstants.PERIOD_STATUS, is.getStatus())
                            .add(DataDraftConstants.UPDATE_TIMESTAMP, ts)
                            .add(DataDraftConstants.UPDATED_BY, is.getUpdatedBy())));

        }

        return bundles;
    }

    protected void cleanseBundles(Draft draft, Edition edition) {

        BundlesArray bundles = edition.getContent();
        if (Objects.isNull(bundles) || bundles.size() <= 1) {
            return;
        }

        Timeline<OriginRelation> timeline = timeline(draft, edition);
        Set<CalculableHolder<OriginRelation>> unique = timeline.stream()
            .map(TimeInterval::unlock)
            .map(MutableTimeInterval::toCalculables)
            .flatMap(Collection::stream)
            .collect(Collectors.toCollection(IdentityHashSet::new));

        bundles.clear();
        for (CalculableHolder<OriginRelation> ch : unique) {

            OriginRelationInfoSection is = ch.getValue().getInfoSection();

            Instant from = ch.getValidFrom() == null ? null : ch.getValidFrom().toInstant();
            Instant to = ch.getValidTo() == null ? null : ch.getValidTo().toInstant();
            Instant ts = is.getUpdateDate().toInstant();

            bundles.add(new DataBundle()
                    .withRecord(ch.getValue())
                    .withVariables(new Variables()
                            .add(DataDraftConstants.BOX_KEY, ch.toBoxKey())
                            .add(DataDraftConstants.REVISION, ch.getRevision())
                            .add(DataDraftConstants.VALID_FROM, from)
                            .add(DataDraftConstants.VALID_TO, to)
                            .add(DataDraftConstants.IS_DRAFT_RECORD, ch.getRevision() == 0 ? Boolean.TRUE : Boolean.FALSE)
                            .add(DataDraftConstants.OPERATION_TYPE, is.getOperationType())
                            .add(DataDraftConstants.PERIOD_STATUS, is.getStatus())
                            .add(DataDraftConstants.UPDATE_TIMESTAMP, ts)
                            .add(DataDraftConstants.UPDATED_BY, is.getUpdatedBy())));

        }
    }
}
